
package com.cygsunri.wisdompark.tuyamq;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.cygsunri.common.EnumConstant;
import com.cygsunri.common.TuYaConstant;
import com.cygsunri.wisdompark.callback.service.ProcessingDataService;
import com.cygsunri.wisdompark.tuyamq.vo.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;


@Slf4j
@Component
@Order(value=1)
@PropertySource("classpath:configConstant.properties")
public class ConsumerMq implements CommandLineRunner {

    @Autowired
    private ProcessingDataService processingDataService;
    @Autowired
    private TuYaConstant tuYaConstant;
    @Value("${init.tuyamq}")
    private boolean inittuyamq;

    @Override
    public void run(String... args) throws Exception {
        if(inittuyamq) {
            new Thread() {
                public void run() {
                    consumerMqAccept();
                }
            }.start();
        }
    }

    private void consumerMqAccept(){
        String url = MqConfigs.CN_SERVER_URL;
        MqConsumer mqConsumer = MqConsumer.build().serviceUrl(url).accessId(tuYaConstant.getAccessId()).accessKey(tuYaConstant.getAccessSecret())
                .maxRedeliverCount(3).messageListener(message -> {
                            log.info("------------------------ Start  tuya-mq----------------------------");
                            log.info("Message received:" + new String(message.getData()) + ",seq="
                                    + message.getSequenceId() + ",time=" + message.getPublishTime() + ",consumed time="
                                    + System.currentTimeMillis());
                            String jsonMessage = new String(message.getData());
                            MessageVO vo = JSON.parseObject(jsonMessage, MessageVO.class);
                            String data = AESBase64Utils.decrypt(vo.getData(), tuYaConstant.getAccessSecret().substring(8, 24));
                            vo.setData(data);
                            log.info("MessageVO:"+JSON.toJSONString(vo));
                            //设备数据上报事件
                            if(vo.getProtocol().equals(EnumConstant.tuyaProtocol.DATA_REPORTED.getValue())){
                                ReportDataMessageVO reportDataMessageVO = JSON.parseObject(vo.getData(),ReportDataMessageVO.class);
                                log.info("ReportDataMessageVO:"+JSON.toJSONString(reportDataMessageVO));
                                //消息处理
                                if(CollectionUtils.isNotEmpty(reportDataMessageVO.getStatus())){
                                    Map<String,Object> map = new HashMap<>();
                                    for(ReportDataStatuDataVO temp : reportDataMessageVO.getStatus()){
                                        map.put(temp.getCode(),temp.getValue());
                                        map.put("time",temp.getT());
                                    }
                                    processingDataService.saveData(reportDataMessageVO.getDevId(),map,EnumConstant.saveSource.tuya.getValue());
                                }
                            }//设备变更
                            else if(vo.getProtocol().equals(EnumConstant.tuyaProtocol.EQUIPMENT_CHANGES.getValue())){
                                EquipmentVariationMessageVO equipmentVariationMessageVO = JSON.parseObject(vo.getData(),EquipmentVariationMessageVO.class);
                                log.info("EquipmentVariationMessageVO:"+JSON.toJSONString(equipmentVariationMessageVO));
                            }//户外告警事件
                            else if(vo.getProtocol().equals(EnumConstant.tuyaProtocol.WARNNING.getValue())){
                                WarnningMessageVO warnningMessageVO = JSON.parseObject(vo.getData(),WarnningMessageVO.class);
                                log.info("WarnningMessageVO:"+JSON.toJSONString(warnningMessageVO));
                            }
                        log.info("------------------------ end  tuya-mq----------------------------");
                        }

                );
        try {
            mqConsumer.start();
        }catch(Exception e){
            e.printStackTrace();
        }
    }

}
